require 'json'
require 'rake/clean'
require 'set'

# Usage:
#  * Start heavydb as:
#    bin/heavydb --allowed-import-paths='["path/to/dir/of/test/files"]' storage
#  * Start postgresql (or set SKIP_PG=1)
#  * When running rake, set environment variable HEAVYSQL="path/to/heavysql -p admin_password"

# PostgreSQL requirements, unless SKIP_PG=1:
#  * `CMD -U postgres` is assumed to run with superuser privileges
#    for CMD in {createuser, createdb, dropuser, dropdb}.

# Tasks should be idempotent. Add tests if needed.

SCALE = ENV['SCALE'] || '1'
RNGSEED = ENV['RNGSEED']
SKIP_PG = ENV['SKIP_PG'] && ENV['SKIP_PG'] != '0' # Skip PostgreSQL tasks. Requires *-pg.* and *.lz4 files.
HEAVYSQL = ENV['HEAVYSQL'] # "path/to/heavysql -p admin_password"
PSQL = 'psql -U' # followed by dbname=username

namespace :tpcds do
  input_zip = 'TPC-DS_Tools_v3.2.0.zip'
  input_sha256 = 'bcdf0b75f123e74bfb71186d48d301cecdd58465621e5fbd3d39f6d759045ec0'
  input_dir = input_zip.sub(/\.zip$/, '')
  output_dir = "#{input_dir}_#{SCALE}GB"
  dbname = "tpcds_#{SCALE}gb"  # PostgreSQL username and database to generate query results
  skip_queries = [47, 57].to_set # Skip queries that take too long
  supported = (1..99)

  directory output_dir
  CLOBBER << output_dir

  directory input_dir do
    # Verify SHA256 of input_zip
    sh "echo '#{input_sha256}  #{input_zip}' | sha256sum -c"
    # Unzip files into input_dir
    sh 'unzip', input_zip, '-d', input_dir
    # sh '7z', 'x', "-o#{input_dir}", input_zip  # TPC-DS_Tools_v3.1.0.zip was a 7z file.
    zipdir = Dir.new(input_dir).reject { |d| d.match(/^\.\.?$/) }.first
    mv Dir.glob("#{input_dir}/#{zipdir}/*"), input_dir
    rmdir "#{input_dir}/#{zipdir}"
  end
  CLEAN << input_dir

  # Repair tools/*.c files by prefixing extern to some variable declarations.
  tools_patch = "#{input_dir}/tools.patch"
  file tools_patch => input_dir do
    cd input_dir do
      patch = File.basename tools_patch
      [ [ 'int yydebug;', 'QgenMain.c' ], [ 'int nItemIndex;', 's_catalog_order.c' ],
        [ 'struct CATALOG_PAGE_TBL g_w_catalog_page;', 's_catalog_page.c' ],
        [ 'int nItemIndex;', 's_purchase.c' ], [ 'struct W_WAREHOUSE_TBL g_w_warehouse;', 's_warehouse.c' ],
        [ 'struct S_WEB_ORDER_LINEITEM_TBL g_s_web_order_lineitem;', 's_web_order_lineitem.c' ],
        [ 'struct W_WEB_SITE_TBL g_w_web_site;', 's_web_site.c' ] ].each do |declare, file|
        path = "tools/#{file}"
        sh "sed -e 's/^#{declare}/extern \\0/' #{path} | diff -u #{path} - >> #{patch}" do |ok, res|
          raise "Expected at least one change to #{path}" if ok || res.exitstatus != 1
        end
      end
      sh 'patch', '-p0', '-i', patch
    end
  end

  # Generate patch file to repair query templates and make compatible w/ PostgreSQL and HeavyDB.
  # Avoid duplicate column names for use in CREATE VIEW temp_view AS ... to easily determine column types.
  queries_patch = "#{input_dir}/queries.patch"
  file queries_patch => input_dir do
    cd input_dir do
      patch = File.basename queries_patch
      query_num = lambda { |f| f[/query(\d+).tpl$/, 1].to_i }
      Dir.glob('query_templates/query*.tpl').sort_by(&query_num).each do |f|
        n = query_num.call f
        sedrules = if n == 2 # Fix ERROR:  subquery in FROM must have an alias
          [ 's/))/) x)/', 's/round(\(...\).*/\0 \1/' ]
        elsif n == 13 # Columns must have unique names in order to create a view based on the query.
          [ 's/avg(ss_ext_sales_price)/\0 avg_price/', 's/avg(ss_ext_wholesale_cost)/\0 avg_cost/' ]
        elsif n == 14 # query14.tpl has 2 separate queries. Delete the 1st.
          [ '39,140d' ]
        elsif n == 23 # query23.tpl has 2 separate queries. Delete the 1st.
          # Fix ERROR:  subquery in FROM must have an alias
          [ '40,89d', 's/c_customer_sk)/\0 x/', 's/c_first_name)/\0 y/' ]
        elsif n == 24 # query24.tpl has 2 separate queries. Delete the 1st.
          #[ '/^;/,$ { /^\([^;]\|$\)/,$d }' ]
          [ 's/MARKET=random(5,10,uniform)/MARKET=3/', '40,92d' ]
        elsif n == 30 # Column 'c_last_review_date_sk' not found in any table.
          [ 's/\(c_last_review_date\)_sk/\1/' ]
        elsif n == 31 # Make order unique.
          [ '/define AGG=/s/text(.*);/text({"ss1.ca_county",1});/' ]
        elsif n == 35 # ERROR:  column "avg" specified more than once
          [ 's/\[\(AGG\w+\)\](cd_dep\(_\w+\))/\0 \1\2/' ]
        elsif n == 59 # Name unnamed columns for view.
          [ 's/\(..._sales\)1\/\12/\0 \1_ratio/g' ]
        elsif n == 64 # Avoid duplicate column names
          [ 's/\(cs1\.\(cnt\|syear\)\)$/\1 s1\2/' ]
        elsif n == 77 # Fix ERROR:  syntax error at or near "returns"
          [ 's/\(coalesce(returns, 0)\) \(returns\)/\1 as \2/',
            's/\([0-9]\{1,\}\) days)/interval \'\1\' day)/' ]
        elsif n == 85 # Give columns unique names for CREATE VIEW
          [ '0,/ws_quantity/s/\(avg(\(ws_quantity\))\)/\1 as avg_\2/',
            '0,/wr_refunded_cash/s/\(avg(\(wr_refunded_cash\))\)/\1 as avg_\2/',
            '0,/wr_fee/s/\(avg(\(wr_fee\))\)/\1 as avg_\2/' ]
        else # Make date arithmetic compatible with PostgreSQL and HeavyDB.
          [ 's/\([0-9]\{1,\}\) days)/interval \'\1\' day)/' ]
        end
        # Use IO.popen to avoid complexities with quoting+escaping sed rules.
        IO.popen ['sed'] + sedrules.map { |rule| ['-e', rule] }.flatten + [f] do |sed|
          IO.popen ['diff', '-u', f, '-'], 'r+' do |diff|
            IO.copy_stream sed, diff
            diff.close_write
            File.open(patch, 'a') { |patch_io| IO.copy_stream diff, patch_io }
          end
        end
      end
      sh 'patch', '-p0', '-i', patch
      # Fix ERROR: Substitution'_END' is used before being initialized
      sh 'echo \'define _END="";\' >> query_templates/netezza.tpl'
    end
  end

  # Also compiles dsdgen
  file "#{input_dir}/tools/dsqgen" => tools_patch do
    cd "#{input_dir}/tools" do
      sh 'make'
      sh 'make'  # 2nd make prevents syntax errors to be falsely detected in dialect files by dsqgen.
    end
  end

  file "#{output_dir}/tpcds.sql" => [ input_dir, output_dir ] do
    cp "#{input_dir}/tools/tpcds.sql", output_dir #, preserve: true # old timestamp causes repeat
  end

  # Fix ALTER TABLE commands
  file "#{output_dir}/tpcds_ri.sql" => [ input_dir, output_dir ] do |f|
    cp "#{input_dir}/tools/tpcds_ri.sql", output_dir
    patch = f.name.sub(/sql$/, 'patch')
    sedrules = [ '/cp_promo_id/d', # ERROR:  column "cp_promo_id" referenced in foreign key constraint does not exist
      '0,/ cr_i /s/ cr_i / cr_i1 /',
      '0,/ cr_i /s/ cr_i / cr_i2 /', # ERROR:  constraint "cr_i" for relation "catalog_returns" already exists
      '/cr_ship_date_sk/d', # ERROR:  column "cr_ship_date_sk" referenced in foreign key constraint does not exist
      '0,/ wr_ret_cd /!s/ wr_ret_cd / wr_ret_hd /', # ERROR:  constraint "wr_ret_cd" for relation "web_returns" already exists
      '0,/ ws_b_cd /!s/ ws_b_cd / ws_b_hd /' ] # ERROR:  constraint "ws_b_cd" for relation "web_sales" already exists
    IO.popen ['sed'] + sedrules.map { |rule| ['-e', rule] }.flatten + [f.name] do |sed|
      IO.popen ['diff', '-u', f.name, '-'], 'r+' do |diff|
        IO.copy_stream sed, diff
        diff.close_write
        File.open(patch, 'w') { |patch_io| IO.copy_stream diff, patch_io }
      end
    end
    sh 'patch', '-p1', '-d', output_dir, '-i', File.basename(patch)
  end

  file "#{output_dir}/tpcds-hv.sql" => [ input_dir, output_dir ] do |f|
    sql = File.read "#{input_dir}/tools/tpcds.sql"
    sql.gsub!(/^--.*\n/, '')                    # Remove comments
    sql.gsub!(/\s*,\s*primary key\b.*?$/m, '')  # Remove primary keys
    File.write f.name, sql
  end

  if !SKIP_PG
    # Generate SQL query files
    rule %r{#{output_dir}/query\d+\.sql} => [queries_patch, "#{input_dir}/tools/dsqgen", output_dir] do |f|
      # Lowest positive RNGSEED for query to not produce empty result for SCALE=1.
      rngseed = { '25' => '2', '29' => '2', '37' => '4', '41' => '10' }
      rngseed.default = '1'
      cd "#{input_dir}/tools" do
        sh './dsqgen', '-DIALECT', 'netezza',
           '-DIRECTORY', '../query_templates',
           '-QUALIFY', 'Y',
           '-RNGSEED', RNGSEED || rngseed[f.name[/query(\d+)\.sql$/,1]],
           '-SCALE', SCALE,
           '-TEMPLATE', File.basename(f.name,'.sql')+'.tpl'
      end
      mv "#{input_dir}/tools/query_0.sql", f.name
    end
  end

  # desc 'Generate table data files: call_center.dat, catalog_page.dat, ..., web_site.dat'
  table_data = "#{output_dir}/web_site.dat"
  file table_data => [ "#{input_dir}/tools/dsqgen", output_dir ] do
    cd "#{input_dir}/tools" do
      sh './dsdgen', '-DIR', "../../#{output_dir}",
         '-RNGSEED', RNGSEED || '1',
         '-SCALE', SCALE,
         '-TERMINATE', 'N',  # exclude terminating delimiter from each line
         '-VERBOSE', 'Y'
    end
  end

  task :hv_setup do
    Rake::Task[:hv_setup].execute dbname
  end

  task :pg_setup do
    Rake::Task[:pg_setup].execute dbname
  end

  desc "Drop HeavyDB database #{dbname}"
  task :hv_clean do
    sh "echo 'DROP DATABASE #{dbname};' | #{HEAVYSQL}" do end
  end

  desc "Drop PostgreSQL database and username #{dbname}"
  task :pg_clean do
    sh(*%W{dropdb -U postgres #{dbname}}) do end
    sh(*%W{dropuser -U postgres #{dbname}})
  end

  task hv_create_tables: [ :hv_setup, "#{output_dir}/tpcds-hv.sql" ] do
    # Test for existence of store_sales table
    test = "echo 'SELECT 1 FROM store_sales LIMIT 1;' | #{HEAVYSQL} -q #{dbname}"
    sh test do |ok,res|
      if !ok
        sh "#{HEAVYSQL} #{dbname} < #{output_dir}/tpcds-hv.sql"
        sh test
      end
    end
  end

  task pg_create_tables: [ :pg_setup, "#{output_dir}/tpcds.sql" ] do
    # Test for existence of store_sales table
    test = "#{PSQL} #{dbname} -tc 'SELECT 1 FROM store_sales LIMIT 1'"
    sh test do |ok,res|
      if !ok
        sh "#{PSQL} #{dbname} < #{output_dir}/tpcds.sql"
        sh test
      end
    end
  end

  task hv_insert: [table_data, :hv_create_tables] do
    # store_sales is the last table imported into
    test = "echo 'SELECT 1 FROM store_sales LIMIT 1;' | #{HEAVYSQL} -q #{dbname}"
    if `#{test}`.to_i != 1
      file_size = lambda { |f| File.size f }
      Dir.glob("#{output_dir}/*.dat").sort_by(&file_size).each do |f|
        table = File.basename f, '.dat'
        file = File.absolute_path f
        query = "COPY #{table} FROM '#{file}' WITH (DELIMITER='|', HEADER='false');"
        sh "echo \"#{query}\" | #{HEAVYSQL} #{dbname}"
      end
      exit 1 if `#{test}`.to_i != 1
    end
  end

  task hv_restore: :hv_setup do
    # store_sales is the last table imported into
    test = "echo 'SELECT 1 FROM store_sales LIMIT 1;' | #{HEAVYSQL} -q #{dbname}"
    if `#{test}`.to_i != 1
      file_size = lambda { |f| File.size f }
      Dir.glob("#{output_dir}/*.lz4").sort_by(&file_size).each do |f|
        table = File.basename f, '.lz4'
        file = File.absolute_path f
        query = "RESTORE TABLE #{table} FROM '#{file}' WITH (COMPRESSION='lz4');"
        sh "echo \"#{query}\" | #{HEAVYSQL} #{dbname}"
      end
      exit 1 if `#{test}`.to_i != 1
    end
  end

  task pg_insert: [ table_data, :pg_create_tables ] do
    # store_sales is the last table imported into
    test = "#{PSQL} #{dbname} -tc 'SELECT 1 FROM store_sales LIMIT 1'"
    if `#{test}`.to_i != 1
      file_size = lambda { |f| File.size f }
      Dir.glob("#{output_dir}/*.dat").sort_by(&file_size).each do |f|
        table = File.basename f, '.dat'
        query = "SET CLIENT_ENCODING='LATIN1'; COPY #{table} FROM STDIN WITH (DELIMITER '|', NULL '')"
        sh "#{PSQL} #{dbname} -c \"#{query}\" < #{f}"
      end
      exit 1 if `#{test}`.to_i != 1
    end
  end

  task pg_constraints: [ :pg_insert, "#{output_dir}/tpcds_ri.sql" ] do
    test = "#{PSQL} #{dbname} -c '\\d web_site' | grep -c web_d2"
    if `#{test}`.to_i != 1
      sh "#{PSQL} #{dbname} < #{output_dir}/tpcds_ri.sql"
      exit 1 if `#{test}`.to_i != 1
    end
  end

  # Capture output of HeavyDB queries
  # This requires task :hv_import but is assumed to be required by a dependent task/file.
  rule %r{#{output_dir}/query\d+-hv\.txt} => [ lambda { |f| f.sub(/-hv\.txt$/, '.sql') } ] do |f|
    import_task = SKIP_PG ? 'tpcds:hv_restore' : 'tpcds:hv_insert'
    Rake::Task[import_task].invoke
    puts "Rule #{f.name}"
    skip = skip_queries.include? f.name[/query(\d+)-hv\.txt$/,1].to_i
    cmd = skip ? "echo 'Query skipped due to long execution time.' | tee #{f.name}"
               : "printf '\\\\timing\\n' | cat - #{f.source} | #{HEAVYSQL} #{dbname} > #{f.name}"
    stderr = IO.popen(cmd, err: [:child, :out]) { |heavysql| heavysql.read }
    if skip || !$?.exitstatus.zero?
      report = { success: false, message: stderr }
      report_filename = f.name.sub(/-hv\.txt$/, '.json')
      File.open(report_filename, 'w') { |file| file.puts report.to_json }
    end
  end

  if !SKIP_PG
    # Capture output of PostgreSQL queries
    # This requires task :pg_import but is assumed to be required by a dependent task/file.
    rule %r{#{output_dir}/query\d+-pg\.txt} => [ lambda { |f| f.sub(/-pg\.txt$/, '.sql') } ] do |f|
      Rake::Task['tpcds:pg_constraints'].invoke
      puts "Rule #{f.name}"
      skip = skip_queries.include? f.name[/query(\d+)-pg\.txt$/,1].to_i
      sh skip ? "echo 'Query skipped due to long execution time.' | tee #{f.name}"
              : "printf '\\\\timing\\n' | cat - #{f.source} | #{PSQL} #{dbname} -A -P null=NULL > #{f.name}"
    end
  end

  # Compare results for an individual query and generate a json report.
  rule '.json' => [ lambda { |f| f.sub(/\.json$/, '-pg.txt') }, lambda { |f| f.sub(/\.json$/, '-hv.txt') } ] do |f|
    report = { success: false }
    pg = PgResults.new f.sources[0]
    columns = PgColumns.new f.name.sub(/json$/,'sql'), dbname
    hv = HvResults.new f.sources[1]
    if pg.rows.size == hv.rows.size
      errors = []
      (0...pg.rows.size).each do |row|
        (0...columns.types.size).each do |col|
          if !columns.equal?(row, col, pg, hv)
            errors << "#{columns.types[col]}[#{row}][#{col}]: pg(#{pg.rows[row][col]}) hv(#{hv.rows[row][col]})"
          end
        end
      end
      report[:success] = errors.empty?
      report[:message] = errors.empty? ? "#{pg.rows.size} row#{pg.rows.size == 1 ? '' : 's'} in #{hv.time_ms} ms."
                                       : errors.join('\n')
    else
      report[:message] = "PostgreSQL number of rows(#{pg.rows.size}) not equal to HeavyDB(#{hv.rows.size})."
    end
    report[:nrows] = hv.rows.size
    report[:time_ms] = hv.time_ms
    File.open(f.name, 'w') { |file| file.puts report.to_json }
  end

  rule "#{output_dir}/report.html" => supported.map { |n| "#{output_dir}/query#{n}.json" } do |f|
    reporter = Reporter.new f.sources
    File.open(f.name, 'w') { |file| reporter.write_to file }
  end

  desc "Import data into HeavyDB database #{dbname}."
  task hv_import: [ :hv_insert ]

  desc "Import data into PostgreSQL database #{dbname}."
  task pg_import: [ :pg_constraints ]

  desc 'Generate SELECT query files: query1.sql, query2.sql, ..., query99.sql'
  task queries: supported.map { |n| "#{output_dir}/query#{n}.sql" }

  desc 'Capture output of SELECT query files into query1-hv.txt, query2-hv.txt, ..., query99-hv.txt'
  task hv_capture: [ :hv_import ] + supported.map { |n| "#{output_dir}/query#{n}-hv.txt" }

  desc 'Capture output of SELECT query files into query1-pg.txt, query2-pg.txt, ..., query99-pg.txt'
  task pg_capture: [ :pg_import ] + supported.map { |n| "#{output_dir}/query#{n}-pg.txt" }

  desc "Run all TPC-DS queries on PostgreSQL and HeavyDB and compare the results."
  task compare: "#{output_dir}/report.html"
end  # namespace :tpcds

namespace :tpch do
  input_zip = 'TPC-H_Tools_v3.0.0.zip'
  input_sha256 = '1e1d8f194dfe166c8b4f97699b0cbe1b7fa80736560231776ce2fe3611d8c3af'
  input_dir = input_zip.sub(/\.zip$/, '')
  output_dir = "#{input_dir}_#{SCALE}GB"
  dbname = "tpch_#{SCALE}gb"  # PostgreSQL username and database to generate query results

  directory output_dir
  CLOBBER << output_dir

  directory input_dir do
    # "Verify SHA256 of #{input_zip}"
    sh "echo '#{input_sha256}  #{input_zip}' | sha256sum -c"
    # TODO Remove/replace 3.0.0-specific code
    sh 'unzip', input_zip, '-x', 'tpc-h_v3.0.0.*'  # tpc-h_v3.0.0.* are placed in INPUT_DIR anyway
    # sh '7z', 'x', '-xtpc-h_v3.0.0.*', input_zip
  end
  CLEAN << input_dir

  file "#{input_dir}/dbgen/qgen" => input_dir do |f|
    subs = ['s/^CC\s*=/\0 gcc/', 's/^DATABASE\s*=/\0 INFORMIX/', 's/MACHINE\s*=/\0 LINUX/', 's/WORKLOAD\s*=/\0 TPCH/']
    cd "#{input_dir}/dbgen" do
      sh "sed -Ee '#{subs.join "' -e '"}' makefile.suite > Makefile"
      sh 'make'
    end
  end

  queries_patch = "#{input_dir}/dbgen/queries.patch"
  file queries_patch => "#{input_dir}/dbgen/qgen" do
    cd "#{input_dir}/dbgen" do
      patch = File.basename queries_patch
      Dir.glob('queries/*.sql').map { |q| "#{/\d+/.match q}".to_i }.sort.each do |n|
        sql = `DSS_QUERY=queries ./qgen -r #{RNGSEED} -s #{SCALE} #{n}`
        sql.gsub!(/\r/, '')  # remove carriage returns
        file = "query#{n}.sql"
        File.write file, sql
        # Create patch file of query modifications
        sql.gsub!(/^(--.*|\s*)\n/, '')  # remove comments and blank lines
        if n == 1
          sql.sub! 'day (3)', 'day'
        elsif n == 11 # quote reserved word "value"
          sql.gsub! 'value', '"value"'
        elsif n == 14 # prevent overflow of DECIMAL(15,2)
          sql.sub! '100.00', 'CAST(100.00 AS DOUBLE)'
        elsif n == 15 # replace view with CTE
          sql.sub!(/create view (.+)/, 'with \1 (')
          sql.sub!(/(?<=l_suppkey);/, ')')
          sql.sub!(/drop view revenue0;\s*/m, '')
        end
        sql.sub!(/;\s+FIRST (-?\d+)\Z/) do
          $1 == '-1' ? ";\n" : "\nLIMIT #{$1};\n"
        end
        IO.popen ['diff', '-u', file, '-'], 'r+' do |diff|
          diff.write sql
          diff.close_write
          File.open(patch, 'a') { |patch_io| IO.copy_stream diff, patch_io }
        end
      end
      sh 'patch', '-p0', '-i', patch
    end
  end

  rule %r{#{output_dir}/query\d+\.sql} => [ output_dir, queries_patch ] do |f|
    cp f.name.sub(output_dir, "#{input_dir}/dbgen"), f.name
  end

  desc 'Generate SELECT query files: query1.sql, query2.sql, ..., query22.sql'
  task queries: (1..22).map { |n| "#{output_dir}/query#{n}.sql" }

  task :hv_setup do
    Rake::Task[:hv_setup].execute dbname
  end

  task :pg_setup do
    Rake::Task[:pg_setup].execute dbname
  end

  desc "Drop PostgreSQL database and username #{dbname}"
  task :pg_clean do
    sh(*%W{dropdb -U postgres #{dbname}}) do end
    sh(*%W{dropuser -U postgres #{dbname}})
  end

  desc "Drop HeavyDB database #{dbname}"
  task :hv_clean do
    sh "echo 'DROP DATABASE #{dbname};' | #{HEAVYSQL}" do end
  end

  file "#{output_dir}/create_tables.sql" => input_dir do |f|
    sh "sed -e 's/^--.*$//' #{input_dir}/dbgen/dss.ddl > #{f.name}"
  end

  task pg_create_tables: [ :pg_setup, "#{output_dir}/create_tables.sql" ] do
    # Test for existence of lineitem table
    test = "#{PSQL} #{dbname} -c 'SELECT 1 FROM lineitem LIMIT 1'"
    sh test do |ok,res|
      if !ok
        sh "#{PSQL} #{dbname} < #{output_dir}/create_tables.sql"
        sh test
      end
    end
  end

  task create_tables_hv: [ :hv_setup, "#{output_dir}/create_tables.sql" ] do
    # Test for existence of lineitem table
    test = 'SELECT 1 FROM lineitem LIMIT 1;'
    sh "echo '#{test}' | #{HEAVYSQL} #{dbname}" do |ok,res|
      if !ok
        sh "#{HEAVYSQL} #{dbname} < #{output_dir}/create_tables.sql"
        sh "echo '#{test}' | #{HEAVYSQL} #{dbname}"
      end
    end
  end

  # desc 'Generate table data files: supplier.tbl, region.tbl, ..., customer.tbl'
  table_data = "#{output_dir}/customer.tbl"
  file "#{output_dir}/customer.tbl" => [ "#{input_dir}/dbgen/qgen", output_dir ] do
    cd "#{input_dir}/dbgen" do
      sh './dbgen', '-v', '-s', SCALE
      sh "sed -ie 's/|$//' *.tbl"  # Remove trailing delimiter from each record
    end
    mv Dir.glob("#{input_dir}/dbgen/*.tbl"), output_dir
  end

  task pg_insert: [ table_data, :pg_create_tables ] do
    # lineitem is the last table imported into
    test = "#{PSQL} #{dbname} -tc 'SELECT 1 FROM lineitem LIMIT 1'"
    if `#{test}`.to_i != 1
      file_size = lambda { |f| File.size f }
      Dir.glob("#{output_dir}/*.tbl").sort_by(&file_size).each do |f|
        table = File.basename f, '.tbl'
        #query = "COPY #{table} FROM STDIN WITH (DELIMITER '|', NULL '', ENCODING 'LATIN1')"
        query = "SET CLIENT_ENCODING='LATIN1'; COPY #{table} FROM STDIN WITH (DELIMITER '|', NULL '')"
        sh "#{PSQL} #{dbname} -c \"#{query}\" < #{f}"
      end
      exit 1 if `#{test}`.to_i != 1
    end
  end

  task insert_hv: [ table_data, :create_tables_hv ] do
    # lineitem is the last table imported into
    test = "echo 'SELECT 1 FROM lineitem LIMIT 1;' | #{HEAVYSQL} -nq #{dbname}"
    if `#{test}`.to_i != 1
      file_size = lambda { |f| File.size f }
      Dir.glob("#{output_dir}/*.tbl").sort_by(&file_size).each do |f|
        table = File.basename f, '.tbl'
        path = Dir.pwd + '/' + f
        copy = "COPY #{table} FROM '#{path}' WITH (delimiter='|', header='false', quoted='false');"
        sh "echo \"#{copy}\" | #{HEAVYSQL} #{dbname}"
      end
      exit 1 if `#{test}`.to_i != 1
    end
  end

  file "#{output_dir}/add_indexes.sql" => input_dir do |f|
    subs = [ 's/^(CONNECT|COMMIT)/--\0/', 's/\bTPCD\.(\w+)/\1/g', 's/(FOREIGN KEY) (\w+)/CONSTRAINT \2 \1/' ]
    sh "sed -Ee '#{subs.join "' -e '"}' #{input_dir}/dbgen/dss.ri > #{f.name}"
  end

  task pg_constraints: [ :pg_insert, "#{output_dir}/add_indexes.sql" ] do
    test = "#{PSQL} #{dbname} -c '\\d lineitem' | grep -c lineitem_fk2"
    if `#{test}`.to_i != 1
      sh "#{PSQL} #{dbname} < #{output_dir}/add_indexes.sql"
      exit 1 if `#{test}`.to_i != 1
    end
  end

  # Capture output of SQL queries
  rule %r{#{output_dir}/query\d+\.txt} => [ lambda { |f| f.sub(/\.txt$/, '.sql') } ] do |f|
    sh "#{PSQL} #{dbname} < #{f.source} > #{f.name}"
  end

  desc 'Capture output of SELECT query files into query1.txt, query2.txt, ..., query22.txt'
  task pg_capture: [ :pg_constraints ] + (1..22).map { |n| "#{output_dir}/query#{n}.txt" }

  # Capture output of HeavyDB queries
  rule %r{#{output_dir}/query\d+-hv\.txt} => [ lambda { |f| f.sub(/-hv\.txt$/, '.sql') } ] do |f|
    sh "#{HEAVYSQL} -qt #{dbname} < #{f.source} > #{f.name}" do end
  end

  desc 'Capture output of SELECT query files into query1-hv.txt, query2-hv.txt, ..., query22-hv.txt'
  task hv_capture: [ :insert_hv ] + (1..22).map { |n| "#{output_dir}/query#{n}-hv.txt" }
end  # namespace :tpch

### Shared tasks. First parameter is always the task name that is automatically inserted by rake.

# Assumes heavydb is running.
task :hv_setup do |task,dbname|
  # Test for existence/connectivity of user and database
  test = "printf '\\\\q\\n' | #{HEAVYSQL} #{dbname}"
  stdout = `#{test}`
  if !stdout.include?('connected to database')
    sh "echo 'CREATE DATABASE #{dbname};' | #{HEAVYSQL}"
    exit 1 if !`#{test}`.include?('connected to database')
  end
end

# Assumes PostgreSQL is running.
task :pg_setup do |task,dbname|
  # Test for existence/connectivity of user and database
  sh "#{PSQL} #{dbname} -c '\\q'" do |ok,res|
    if !ok
      sh(*%W{createuser -U postgres #{dbname}}) do end
      sh(*%W{dropdb -U postgres #{dbname}}) do end
      sh(*%W{createdb -U postgres -O #{dbname} #{dbname}})
      sh "#{PSQL} #{dbname} -c '\\q'"
    end
  end
end

# Parse HeavyDB query output.
class HvResults
  attr_reader :rows, :time_ms
  def initialize filename
    puts "Parsing filename(#{filename})."
    # TPC-DS is in LATIN1 but PostgreSQL outputs in UTF-8 so convert to utf-8 for comparison.
    lines = File.readlines(filename).map { |line| line.chomp.encode 'utf-8', 'iso-8859-1' }
    lines.shift =~ /^User .*? connected to database .*$/ or raise 'Connection message expected for HeavyDB.'
    @header = lines.shift.split '|'
    lines.pop =~ /^User .*? disconnected from database .*$/ or raise 'Disconnection message expected for HeavyDB.'
    @time_ms = parseQueryTime lines.pop  # E.g. 'Execution time: 65 ms, Total time: 66 ms'
    nrows = @header == [ 'No rows returned.' ] ? 0 : parseNrows(lines.pop)  # E.g. '100 rows returned.'
    @rows = lines.map { |line| line.split '|' }
    @rows.size == nrows or raise "Error: nrows(#{nrows}) but @rows.size(#{@rows.size})."
  end
 private
  def parseQueryTime line
    if md = line.match(/^Execution time: (\S+) ms,/)
      md[1].to_f
    else
      raise "Unexpected execution time message(#{line})."
    end
  end
  def parseNrows line
    if md = line.match(/^(\d+) rows returned.$/)
      md[1].to_i
    else
      raise "Unexpected row count message(#{line})."
    end
  end
end

# Parse PostgreSQL query output.
class PgResults
  attr_reader :rows
  def initialize filename
    puts "Parsing filename(#{filename})."
    #lines = File.readlines filename, chomp: true
    lines = File.readlines(filename, encoding: 'UTF-8').map &:chomp  # ruby 2.0
    lines.shift == 'Timing is on.' or raise '\timing is expected on for all benchmark PostgreSQL queries.'
    @header = lines.shift.split '|'
    @time_ms = parseQueryTime lines.pop  # E.g. 'Execution time: 65 ms, Total time: 66 ms'
    nrows = parseNrows lines.pop  # E.g. '100 rows returned.'
    adjustOrdering lines
    @rows = lines.map { |line| line.split '|' }
    @rows.size == nrows or raise "Error: nrows(#{nrows}) but @rows.size(#{@rows.size})."
  end
 private
  def adjustOrdering lines
    # TPC-DS Query 65 - PostgreSQL sorts ',' before ' ' but HeavyDB is the reverse.
    if i = lines.find_index { |line| line.start_with? 'ation|Local,' }
      if i+1 < lines.size && lines[i+1].start_with?('ation|Local ')
        lines[i], lines[i+1] = lines[i+1], lines[i]
      end
    end
  end
  def parseQueryTime line
    if md = line.match(/^Time: (\S+) ms\b/)  # Time: 2119861.172 ms (35:19.861)
      md[1].to_f
    else
      raise "Unexpected execution time message(#{line})."
    end
  end
  def parseNrows line
    if md = line.match(/^\((\d+) rows?\)$/)
      md[1].to_i
    else
      raise "Unexpected row count message(#{line})."
    end
  end
end

# Track PostgreSQL column types of a query to know how values should be compared.
class PgColumns
  attr_reader :types
  def initialize filename, dbname
    @types = getColumnTypes filename, dbname
  end
  def equal? row, col, pg, hv
    pg_str, hv_str = pg.rows[row][col], hv.rows[row][col]
    type = @types[col]
    if type == 'numeric'
      # decimal division in HeavyDB is broken since it doesn't extend the number of decimal places so we
      # accommodate this by checking if pg_str has same or greater number of significant figures as hv_str.
      _, frac = hv_str.split '.'
      if frac.nil?
        pg_str.to_f == hv_str.to_f
      else
        # Example: pg(160.81166667) hv(160.81167)
        # Check pg value is within error of hv.
        err = 10.0 ** -frac.size
        pg_str.to_f.between? hv_str.to_f - err, hv_str.to_f + err
      end
    elsif type =~ /^character( varying)?\(\d+\)$/
      pg_str.rstrip == hv_str.rstrip  # PostgreSQL keeps trailing space on character() columns.
    else
      pg_str == hv_str
    end
  end
 private
  def getColumnTypes filename, dbname
    columns_file = filename.sub(/\.sql$/, '-pg.view')
    if !File.exist?(columns_file)
      queries = "CREATE OR REPLACE TEMPORARY VIEW temp_view AS #{File.read(filename)} \\d temp_view"
      lines = IO.popen "#{PSQL} #{dbname} --no-align", 'r+' do |psql|
        psql.puts queries
        psql.close_write
        File.open(columns_file, 'w') { |output| IO.copy_stream psql, output }
      end
    end
    lines = File.readlines(columns_file).map &:chomp  # ruby 2.0
    lines[2] == 'Column|Type|Collation|Nullable|Default' or raise "Unexpected response (#{lines}) for #{filename}."
    lines[3...lines.size].map { |line| line[/\|(.*?)\|/, 1] }
  end
end

# Write out single HTML report based on all query*.json results.
class Reporter
  def initialize json_files
    @json_files = json_files
  end
  def format message
    message.gsub('<','&lt;').gsub("\n","<br/>\n").gsub('\n',"<br/>\n")
  end
  def write_to ouput
    ouput.puts <<-HTML
<html>
<head>
<style>
body { background-color: white; }
table, th, td {
  border: 1px solid black;
  border-collapse: collapse;
}
tr:nth-child(even) { background-color: #D6EEEE; }
td { text-align: right; }
table.failed td:nth-child(2) { text-align: left; }
</style>
</head>

<body>

<h1>TPC-DS Queries</h1>

<h3>Successful Queries</h3>

<table>
<tr><th>Query #</th><th>Time (ms)</th><th>Rows</th></tr>
HTML
    @json_files.each do |filename|
      n = filename[/query(\d+)\.json$/,1].to_i
      json = File.open(filename) { |file| JSON.load file }
      if json['success']
        ouput.puts '<tr><td>%s</td><td>%s</td><td>%s</td></tr>' % [ n, json['time_ms'], json['nrows'] ]
      end
    end
    ouput.puts <<-HTML
</table>

<h3>Failed Queries</h3>

<table class="failed">
<tr><th>Query #</th><th>Message</th></tr>
HTML
    @json_files.each do |filename|
      n = filename[/query(\d+)\.json$/,1].to_i
      json = File.open(filename) { |file| JSON.load file }
      if !json['success']
        ouput.puts '<tr><td>%s</td><td>%s</td></tr>' % [ n, format(json['message']) ]
      end
    end
    ouput.puts <<-HTML
</table>

</body>
</html>
HTML
  end
end
